package models

import (
	"acs/taskprocess"
	"fmt"
	"log"
	"sync"

	"gopkg.in/vmihailenco/msgpack.v2"

	"github.com/bitly/go-nsq"
)

type NsqProductor struct {
	productor *nsq.Producer
	lock      sync.Mutex
}

var nsqProductor NsqProductor

func InitNsqServer() (err error) {
	config := nsq.NewConfig()
	// config.LocalAddr, err = net.ResolveTCPAddr("tcp", Conf.NsqUpv.Nsq_upv_server_ip+":4150")
	// if err != nil {
	// 	log.Panicln("failed to connect to nsq server:" + err.Error())
	// 	return err
	// }

	nsqProductor.productor, err = nsq.NewProducer(Conf.NsqUpv.Nsq_upv_server_ip+":"+Conf.NsqUpv.Nsq_upv_server_port, config)
	if err != nil {
		log.Panicln("failed to connect to nsq server:" + err.Error())
		return err
	}
	err = nsqProductor.productor.Ping()
	if err != nil {
		log.Panicln("failed to connect to nsq server:" + err.Error())
		return err
	}
	//////////////////////////////////////////
	nsqConsumer.consumer, err = nsq.NewConsumer(Conf.NsqUpv.Nsq_user_app_version_topic, "roland-channel", config) // 新建一个消费者
	if err != nil {
		return err
	}

	return nil
}

//MulPushUPV2NSQ 多条数据一起推送
func MulPushUPV2NSQ(batchUPVList [][]byte) (err error) {
	nsqProductor.lock.Lock()
	err = nsqProductor.productor.MultiPublish(Conf.NsqUpv.Nsq_user_app_version_topic, batchUPVList)
	nsqProductor.lock.Unlock()
	return
}

//PushUPV2NSQ 将用户的当前patch version数组推入到nsq队列中，等待处理。
func PushUPV2NSQ(batchUPVList []byte) (err error) {
	nsqProductor.lock.Lock()
	err = nsqProductor.productor.Publish(Conf.NsqUpv.Nsq_user_app_version_topic, batchUPVList)
	nsqProductor.lock.Unlock()
	return
}

type NsqConsumer struct {
	consumer *nsq.Consumer
	lock     sync.Mutex
}

var nsqConsumer NsqConsumer

type consumerHandler struct {
	stopChan chan bool
}

func (*consumerHandler) HandleMessage(msg *nsq.Message) error {
	taskProtoNew := taskprocess.TaskInQueue{}
	msgpack.Unmarshal(msg.Body, &taskProtoNew)

	fmt.Println(taskProtoNew.Type, ":::::HandleMessage::::::"+string(msg.Body))
	return nil
}

func (c *consumerHandler) Stop() {
	c.stopChan <- true
	close(c.stopChan)
}
func (c *consumerHandler) ShouldStop() <-chan bool {
	return c.stopChan
}

func NewConsumerHanlder() *consumerHandler {
	return &consumerHandler{
		stopChan: make(chan bool, 1),
	}
}

//HandleUPVFromNSQ
func HandleUPVFromNSQ() (*consumerHandler, error) {
	nsqConsumer.lock.Lock()
	defer nsqConsumer.lock.Unlock()
	var consumerhandler = NewConsumerHanlder()
	nsqConsumer.consumer.AddHandler(consumerhandler)
	err := nsqConsumer.consumer.ConnectToNSQD(Conf.NsqUpv.Nsq_upv_server_ip + ":" + Conf.NsqUpv.Nsq_upv_server_port)
	if err != nil {
		fmt.Println("HandleUPVFromNSQ:" + err.Error())
	}

	return consumerhandler, err
}
